package comet

import (
	acsclient "acs/comet/client"
	"acs/comet/proto"
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"net"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

const (
	MaxControlServerClients        = int32(100)
	MaxControlServerClientIdleTime = 60
	ControlServerCmdClose          = "close"
	ControlServerCmdQuit           = "quit"
	ControlServerCmdExit           = "exit"
	ControlServerCmdDisconnect     = "disconnect"
	ControlServerCmdHelp           = "help"
	ControlServerClientsInfo       = "clients"
)

var (
	ErrorControlServerClosed = errors.New("control server closed")
)

// ControlServer 提供后台控制服务接口
type ControlServer struct {
	addr        *net.TCPAddr
	connCounter int32
}

type ControlServerResponse struct {
	Code int         `json:"code"`
	Data interface{} `json:"data"`
}

type controlServerClientPacket struct {
	cmd  string
	data []byte
	err  error
}

//NewControlServer 获取一个基于tcp协议的后台控制服务
func NewControlServer(addr string) (*ControlServer, error) {
	tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
	if err != nil {
		return nil, err
	}
	cs := &ControlServer{
		addr: tcpAddr,
	}
	return cs, nil
}

func (cs *ControlServer) Serve() error {
	srv, err := net.ListenTCP("tcp", cs.addr)
	if err != nil {
		return err
	}
	logger.Infof("Listening on: %v", cs.addr)
	var connChan = make(chan net.Conn)
	go cs.acceptConn(srv, connChan)
	for {
		select {
		case conn := <-connChan:
			if conn == nil {
				logger.Warnf("Control server encountered an error, stopping...")
				srv.Close()
				return ErrorControlServerClosed
			} else {
				logger.Infof("Accepted connection from: %v", conn.RemoteAddr())
				go cs.handleConn(conn)
			}
		}
	}
}

func (cs *ControlServer) acceptConn(ls *net.TCPListener, ch chan net.Conn) {
	for {
		conn, err := ls.Accept()
		if err != nil {
			logger.Warnf("Connection accept error: %v", err)
			ch <- nil
			return
		}

		if atomic.LoadInt32(&cs.connCounter) >= MaxControlServerClients {
			cs.response(conn, 1000, fmt.Sprintf("Maximum clients[%v] reached!", MaxControlServerClients))
			conn.Close()
			continue
		}
		ch <- conn
		atomic.AddInt32(&cs.connCounter, 1)
	}
}

func (cs *ControlServer) handleConn(conn net.Conn) {
	var err error
	defer func() {
		atomic.AddInt32(&cs.connCounter, -1)
		fErr := recover()
		if fErr != nil {
			logger.Errorf("Control server connection handle fatal: %v", fErr)
		}
		if err != nil {
			conn.Close()
			logger.Warnf("Closed client connection: %v", err)
		}
	}()
	packetChan := make(chan *controlServerClientPacket)
	go cs.readClientCmds(conn, packetChan)
	for {
		readTimeout := time.After(time.Second * MaxControlServerClientIdleTime)
		select {
		case <-readTimeout:
			cs.response(conn, 1001, "idle timeout")
			conn.Close()
			return
		case packet := <-packetChan:
			go cs.handleCmd(conn, packet.cmd, packet.data)
		}
	}
}

func (cs *ControlServer) readClientCmds(conn net.Conn, ch chan *controlServerClientPacket) {
	var err error
	for {
		reqData := []byte{}
		clientPacket := controlServerClientPacket{}
		// 读取客户端的数据包(以回车换行结尾)
		for {
			b := make([]byte, 1)
			_, err = conn.Read(b)

			if err != nil {
				clientPacket.err = err
				return
			}
			if b[0] == '\r' {
				continue
			} else if b[0] == '\n' {
				break
			} else {
				reqData = append(reqData, b[0])
			}
		}

		if len(reqData) == 0 {
			cs.response(conn, 1000, "empty command.")
			continue
		}

		// 解析命令和数据
		bReader := bytes.NewReader(reqData)

		cmdB := []byte{}
		i := 0
		tB := len(reqData)
		for {
			b := make([]byte, 1)
			_, err = bReader.Read(b)
			if err != nil {
				clientPacket.err = err
				return
			}
			if b[0] == ' ' || b[0] == '\r' || b[0] == '\n' {
				break
			} else {
				cmdB = append(cmdB, b[0])
				i++
			}
			if i == tB {
				break
			}
		}
		clientPacket.data = bytes.Trim(reqData[len(cmdB):], " \r\n\t")
		clientPacket.cmd = string(cmdB)
		ch <- &clientPacket
	}
}

func (cs *ControlServer) response(conn net.Conn, errCode int, data interface{}) error {
	b, err := json.Marshal(ControlServerResponse{Code: errCode, Data: data})
	if err != nil {
		return err
	}
	_, err = conn.Write(b)
	conn.Write([]byte("\r\n"))
	return err
}

func (cs *ControlServer) handleCmd(conn net.Conn, cmd string, data []byte) {
	defer func() {
		err := recover()
		if err != nil {
			logger.Errorf("Control server command handle fatal: %v", err)
		}
	}()
	cmd = strings.ToLower(cmd)
	switch cmd {
	case ControlServerCmdHelp:
		cs.response(conn, 0, fmt.Sprintf("Available commands: %v %v %v %v %v %v", ControlServerCmdHelp, ControlServerCmdClose, ControlServerCmdDisconnect, ControlServerCmdExit, ControlServerCmdQuit, ControlServerClientsInfo))
	case ControlServerCmdClose, ControlServerCmdDisconnect, ControlServerCmdExit, ControlServerCmdQuit:
		conn.Close()
	case ControlServerClientsInfo:
		cs.response(conn, 0, getClientInfo())
	default:
		err := cs.response(conn, 0, fmt.Sprintf("unkown command: %v, %s", cmd, data))
		if err != nil {
			logger.Warnf("Failed to handle command:[%v] [%v]: %v", cmd, string(data), err)
		}
	}
}

// TODO: performance under large client count.
func getClientInfo() *[]proto.ClientInfo {
	type collector struct {
		items []proto.ClientInfo
		sync.RWMutex
	}

	cl := &collector{
		items: make([]proto.ClientInfo, 0),
	}

	clientList.MapFunc(func(c *acsclient.Client, arg interface{}) {
		cl := arg.(*collector)
		cl.Lock()
		defer cl.Unlock()
		cl.items = append(cl.items, proto.ClientInfo{
			RegisterInfo: c.GetRegisterInfo(),
			IpAddr:       c.GetRemoteAddr(),
			ConnectTime:  c.ConnectTime,
			LastRegTime:  c.LastRegTime,
		})
	},
		cl)
	return &cl.items
}
